生产者可以理解为是生产数据的,消费者可以理解为处理数据的

1. 为什么要使用生产者和消费者模式

  • 从函数的方面理解:如果我要生产一个数据,然后将这个数据给函数,让函数依赖这个数据进行运算,且这一过程就是同步过程,如果数据生产的很慢,函数就会一直等待,后面的代码也无法执行

  • 从进程的方面理解:在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。(通俗理解:如果数据产生的快,处理的慢,或者产生的慢,消费的快,那么就使生产者和消费者模式)

2. 什么是生产者和消费者模式

生产者和消费者彼此之间不进行直接的通信,而是通过队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给队列,消费者不找生产者要数据,而是直接从队列里取,队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

3. 做包子和吃包子例子

  • 例如包子做的快而吃的慢

import time
from multiprocessing import Process
from multiprocessing import Queue


# 生产者
def producer(q):
    for i in range(100):
        q.put('%s.包子' % i)  # 将生产的数据添加到队列中


# 消费者
def consumer(q):
    for i in range(100):
        time.sleep(1)  # 模拟吃包子需要1秒时间
        print('吃了%s' % q.get())  # 从队列中获取生产的数据,不直接先生产者获取


if __name__ == '__main__':
    q = Queue(10)  # 限制队列的长度,因为如果一下子生产了100个数据放进队列中,且消费者进程消费的很慢,那么没有被消费的数据就会一直在内存中等待被消费,一直占用着内存

# 生产者进程 -> 如果生产的很慢想提高生产速度可以开多几个进程进行处理
    p = Process(target=producer, args=(q,))
    p.start()

# 消费者进程 -> 如果消费的很慢想提高消费速度可以开多几个进程进行处理
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.start()
    c2.start()

4. 解决生产者和消费者模式中不平衡的问题

  • 当我们循环去获取队列中生产者所生产的值的时候,我们不知道生产者会生产了多少个数据,这样就会造成循环无法结束 q.get() 一直在等待着获取队里中的数据,那么程序就会进入阻塞无法结束程序

# 无法结束循环,程序进入阻塞无法结束程序

import time
from multiprocessing import Process
from multiprocessing import Queue


# 生产者
def producer(q):
    for i in range(50):
        q.put('%s.包子' % i)


# 消费者
def consumer(q):
    while True:  # 一般情况下会使用 while 去获取数据,因为我们不知道生产者会生产多少个数据,但是这样也造成了循环无法结束
        time.sleep(1)
        print('吃了%s' % q.get())  # 循环无法借宿导致 q.get() 一直在等待获取队列中的数据


if __name__ == '__main__':
    q = Queue(10)

# 生产者进程
    p = Process(target=producer, args=(q,))
    p.start()

 # 消费者进程
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.start()
    c2.start()

  • 解决方法一 

    • 生产完数据后再向队列中添加多一个标识数据,当消费者循环获取数据的时候就要进行判断,拿到的数据是不是等于标识数据,如果等于就说明消费者已经获取完所有生产者的数据,可以结束循环

    • 存在问题: 如果有100个消费进程,生产完数据后就要往队列添加100个标识数据

import time
from multiprocessing import Process
from multiprocessing import Queue


# 生产者
def producer(q):
    for i in range(50):
        q.put('%s.包子' % i)
# 如果有3个消费进程,那么就要往队列添加3个标识数据

    q.put(None)  # 当生产完数据后再向队列中添加多一个标识数据
    q.put(None)  # 当生产完数据后再向队列中添加多一个标识数据
    q.put(None)  # 当生产完数据后再向队列中添加多一个标识数据


# 消费者
def consumer(q):
    while True:
        time.sleep(1)
        bun = q.get()

        if bun is None:  # 判断拿到的数据是不是等于标识数据,如果等于就说明消费者已经获取完所有生产者的数据,可以结束循环
            break

        print('吃了%s' % bun)


if __name__ == '__main__':
    q = Queue(10)

# 生产者进程
    p = Process(target=producer, args=(q,))
    p.start()

# 消费者进程
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c3 = Process(target=consumer, args=(q,))
    c1.start()
    c2.start()
    c3.start()

  • 解决方法二 -> 使用 JoinableQueue 队列

    • 使用 JoinableQueue 队列的执行顺序:生产者生产的数据全部被消费完 -> 生产者进程结束 -> 主进程代码执行结束 -> 消费者守护进程结束

import time
from multiprocessing import Process
from multiprocessing import JoinableQueue


# 生产者
def producer(q):
    for i in range(50):
        q.put('%s.包子' % i)
    q.join()  # 等待 消费者进程 把所有的数据处理完


# 消费者
def consumer(q):
    while True:
        time.sleep(1)
        bun = q.get()
        print('吃了%s' % bun)
        q.task_done()  # 告诉生产者进程我处理完了这一个数据


if __name__ == '__main__':
    q = JoinableQueue()

 # 生产者进程
    p = Process(target=producer, args=(q,))
    p.start()


 # 消费者进程 -> 所有的消费者进程都要开启守护进程,当主进程的代码执行结束后,消费者进程也随之结束,里面的循环也不会被执行,这样就不用像解决办法一通过标识数据来结束循环
    c1 = Process(target=consumer, args=(q,))
    c1.daemon = True  # 开启守护进程
    c1.start()

    c2 = Process(target=consumer, args=(q,))
    c2.daemon = True  # 开启守护进程
    c2.start()

    c3 = Process(target=consumer, args=(q,))
    c3.daemon = True  # 开启守护进程
    c3.start()

# 主进程中所有的生产者进程都要执行 .join 方法,等待 xxx 生产者进程结束
    p.join()  # 等待 p 生产者进程结束,如果其他的生产者进程,那么也要执行 .join 方法等待 xxx 生产者进程结束